package com.ws.framework.zk;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;

/**

 * @author WSH

 *

 */
public class CuratorClient {

	private Logger logger = Logger.getLogger(this.getClass().getName());

	private static final int DEFAULT_SESSION_TIMOUT_MS = 10000;

	private final String schemal = "digest";

	private final String separator = ":";

	private CuratorFramework client;

	private int sessionTimeoutMs;

	private String connectStr;

	private List<AuthInfo> authInfos;
	
	private final RetryPolicy policy = new ExponentialBackoffRetry(1000, Integer.MAX_VALUE);

	public CuratorClient(String connectStr) {
		this(connectStr, Collections.<Auther> emptyList());
	}

	public CuratorClient(String connectStr, List<Auther> authers) {
		this(DEFAULT_SESSION_TIMOUT_MS, connectStr, authers);
	}

	public CuratorClient(int sessionTimeoutMs, String connectStr, List<Auther> authers) {
		if (null == connectStr || connectStr.length() <= 0)
			throw new IllegalArgumentException("Invalid Connection Host " + connectStr);
		this.sessionTimeoutMs = sessionTimeoutMs;
		this.connectStr = connectStr;
		this.authInfos = transfer(authers);
		init();
	}

	private List<AuthInfo> transfer(List<Auther> authers) {
		List<AuthInfo> authInfos = new ArrayList<AuthInfo>();
		for (Auther auther : authers) {
			authInfos.add(new AuthInfo(schemal, (auther.getUsername() + separator + auther.getPassword()).getBytes()));
		}
		return authInfos;
	}

	private void init() {
		client = CuratorFrameworkFactory.builder().sessionTimeoutMs(sessionTimeoutMs).retryPolicy(policy)
				.connectString(connectStr).authorization(authInfos).build();
		client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
			@Override
			public void stateChanged(CuratorFramework client, ConnectionState state) {
				if (state == ConnectionState.LOST) {
					logger.log(Level.WARNING, "zookeeper失去链接");
				}
				if (state == ConnectionState.CONNECTED) {
					logger.log(Level.INFO, "zookeeper链接成功");
				}
				if (state == ConnectionState.RECONNECTED) {
					logger.log(Level.INFO, "zookeeper重新链接成功");
				}
			}
		});
		client.start();
	}
	
	public synchronized CuratorFramework getClient() {
		if (null != client)
			return client;
		else {
			init();
			return client;
		}
			
	}
	
	public synchronized void close() {
		if (null != client) {
			CloseableUtils.closeQuietly(client);
			client = null;
		}
	}
}